CONUS404 Spatial Aggregation#

Using Xarray, GeoPandas and Sparse

Goal: Spatially aggregate a model data variable conservatively, i.e. by exactly partitioning each grid cell into the precise region boundaries.

Approach:

  • Represent both the original model grid and target grid as GeoPandas GeoSeries objects (with vector geometries)

  • Compute their area overlay and turn it into a sparse matrix

  • Perform matrix multiplication on the full Xarray dataset (with a time dimension)

It is quite fast and transparent.

%xmode minimal
import xarray as xr
import geopandas as gp
import pandas as pd
import sparse

import hvplot.pandas
import hvplot.xarray
import dask
import cf_xarray

from pynhd import NLDI, WaterData
import intake
import cartopy.crs as ccrs
Exception reporting mode: Minimal
def configure_cluster(machine):
    ''' Helper function to configure cluster
    '''
    if machine == 'denali':
        from dask.distributed import LocalCluster, Client
        cluster = LocalCluster(threads_per_worker=1)
        client = Client(cluster)
    
    elif machine == 'tallgrass':
        from dask.distributed import Client
        from dask_jobqueue import SLURMCluster
        cluster = SLURMCluster(queue='cpu', cores=1, 
                               walltime="01:00:00", account="woodshole",
                               interface='ib0', memory='6GB')
        cluster.adapt(maximum_jobs=10)
        client = Client(cluster)
        
    elif machine == 'local':
        import os
        import warnings
        from dask.distributed import LocalCluster, Client
        warnings.warn("Running locally can result in costly data transfers!\n")
        n_cores = os.cpu_count() # set to match your machine
        cluster = LocalCluster(threads_per_worker=n_cores)
        client = Client(cluster)
        
    elif machine in ['esip-qhub-gateway-v0.4']:   
        import sys, os
        sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
        import ebdpy as ebd
        aws_profile = 'esip-qhub'
        ebd.set_credentials(profile=aws_profile)

        aws_region = 'us-west-2'
        endpoint = f's3.{aws_region}.amazonaws.com'
        ebd.set_credentials(profile=aws_profile, region=aws_region, endpoint=endpoint)
        worker_max = 10
        client,cluster = ebd.start_dask_cluster(profile=aws_profile, worker_max=worker_max, 
                                              region=aws_region, use_existing_cluster=True,
                                              adaptive_scaling=False, wait_for_cluster=False, 
                                              worker_profile='Medium Worker', propagate_env=True)
        
    return client, cluster

Same notebook can be run on on-prem HPC or Cloud#

Here we start as Dask Cluster on the USGS HPC Denali system or on a JupyterHub deployment with Kubernetes running on Cloud

import os
if 'SLURM_CLUSTER_NAME' in os.environ:    # on prem
    dataset = 'conus404-hourly-onprem'
    machine = os.environ['SLURM_CLUSTER_NAME']
    client, cluster = configure_cluster(machine) 
else:                                     # on cloud
    dataset = 'conus404-hourly-cloud'     
    machine = 'esip-qhub-gateway-v0.4'
    client, cluster = configure_cluster(machine)
Region: us-west-2
No Cluster running.
Starting new cluster.
{}
Setting Cluster Environment Variable AWS_DEFAULT_REGION us-west-2
Setting Fixed Scaling workers=10
Reconnect client to clear cache
client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub):
https://nebari.esipfed.org/gateway/clusters/dev.2e772182b32447d9bcdecc80d0faa780/status
Propagating environment variables to workers
Using environment: users/users-pangeofu

Load the feature polygons (e.g. here catchment basins)#

Load with geopandas:

%%time
# USGS gage 01482100 Delaware River at Del Mem Bridge at Wilmington De
gage_id = '01482100'
nldi = NLDI()
del_basins = nldi.get_basins(gage_id)
huc12_basins = WaterData('huc12').bygeom(del_basins.geometry[0])
CPU times: user 1.47 s, sys: 173 ms, total: 1.64 s
Wall time: 24.3 s
regions_df = huc12_basins
region_name = 'name'

Check it:

regions_df.plot(column=region_name, figsize=(10,4))
<Axes: >
../_images/3176082a918577af17604d4be4f2a8f61b24134eff2c0a1bfd008f55e1fa7067.png

All geodataframes should have a coordinate reference system. This is important (and sometimes unfamiliar to users coming from the global climate world).

crs_orig = f'EPSG:{regions_df.crs.to_epsg()}'
crs_orig
'EPSG:4326'

Open the gridded model dataset (e.g. here CONUS404)#

url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
# open the hytest data intake catalog
hytest_cat = intake.open_catalog(url)
list(hytest_cat)
['conus404-drb-eval-tutorial-catalog',
 'nhm-v1.0-daymet-catalog',
 'nhm-v1.1-c404-bc-catalog',
 'nhm-v1.1-gridmet-catalog',
 'conus404-catalog',
 'nwis-streamflow-usgs-gages-onprem',
 'nwis-streamflow-usgs-gages-cloud',
 'nwm21-streamflow-usgs-gages-onprem',
 'nwm21-streamflow-usgs-gages-cloud',
 'nwm21-streamflow-cloud',
 'nwm21-scores',
 'lcmap-cloud',
 'rechunking-tutorial-cloud']
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)
['conus404-hourly-onprem',
 'conus404-hourly-cloud',
 'conus404-hourly-osn',
 'conus404-daily-diagnostic-onprem',
 'conus404-daily-diagnostic-cloud',
 'conus404-daily-onprem',
 'conus404-daily-cloud',
 'conus404-monthly-onprem',
 'conus404-monthly-cloud']
ds = cat['conus404-daily-cloud'].to_dask()
x = 'x'  # projected x coordinate name
y = 'y'  # projected y coordinate name
ds.crs
<xarray.DataArray 'crs' ()>
[1 values with dtype=int32]
Attributes:
    grid_mapping_name:              lambert_conformal_conic
    latitude_of_projection_origin:  39.100006103515625
    longitude_of_central_meridian:  262.0999984741211
    semi_major_axis:                6370000.0
    semi_minor_axis:                6370000.0
    standard_parallel:              [30.0, 50.0]
crs_info = ds.crs
xx = ds.x.values
yy = ds.y.values
globe = ccrs.Globe(ellipse='sphere', semimajor_axis=6370000, semiminor_axis=6370000)
lcc = ccrs.LambertConformal(globe=globe,
                            central_longitude=crs_info.longitude_of_central_meridian, 
                            central_latitude=crs_info.latitude_of_projection_origin,
                            standard_parallels=crs_info.standard_parallel)
lcc_wkt = lcc.to_wkt()
regions_df = regions_df.to_crs(lcc_wkt)
bbox = tuple(regions_df.total_bounds)
bbox
(1751263.2290125347, 214989.48670458575, 1964829.8118029535, 619548.965931173)

Subset gridded model results to bounds of spatial dataframe to save on memory and computation. More useful when the regions_df is much smaller than the footprint of the gridded model

ds = ds.sel(x=slice(bbox[0],bbox[2]), y=slice(bbox[1],bbox[3]))

Now we extract just the horizontal grid information. The dataset has information about the lat and lon bounds of each cell, which we need to create the CONUS404 grid cell polygons.

var = 'T2'  # 2m Temp
var = 'PREC_ACC_NC' # precip
grid = ds[[var]].drop(['time', 'lon', 'lat', var]).reset_coords().load()
grid
<xarray.Dataset>
Dimensions:  (x: 54, y: 101)
Coordinates:
  * x        (x) float64 1.752e+06 1.756e+06 1.76e+06 ... 1.96e+06 1.964e+06
  * y        (y) float64 2.16e+05 2.2e+05 2.24e+05 ... 6.12e+05 6.16e+05
Data variables:
    *empty*
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

Now we “stack” the data into a single 1D array. This is the first step towards transitioning to pandas.

grid = grid.cf.add_bounds([x, y])
points = grid.stack(point=(y,x))
points
<xarray.Dataset>
Dimensions:   (bounds: 2, point: 5454)
Coordinates:
    x_bounds  (bounds, point) float64 1.75e+06 1.754e+06 ... 1.962e+06 1.966e+06
    y_bounds  (bounds, point) float64 2.14e+05 2.14e+05 ... 6.18e+05 6.18e+05
  * point     (point) object MultiIndex
  * y         (point) float64 2.16e+05 2.16e+05 2.16e+05 ... 6.16e+05 6.16e+05
  * x         (point) float64 1.752e+06 1.756e+06 ... 1.96e+06 1.964e+06
Dimensions without coordinates: bounds
Data variables:
    *empty*
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

This function creates geometries for a single pair of bounds. It is not fast, but it is fast enough here. Perhaps could be vectorized using pygeos…

from shapely.geometry import Polygon

def bounds_to_poly(x_bounds, y_bounds):
    return Polygon([
        (x_bounds[0], y_bounds[0]),
        (x_bounds[0], y_bounds[1]),
        (x_bounds[1], y_bounds[1]),
        (x_bounds[1], y_bounds[0])
    ])

We apply this function to each grid cell.

%%time
import numpy as np
boxes = xr.apply_ufunc(
    bounds_to_poly,
    points.x_bounds,
    points.y_bounds,
    input_core_dims=[("bounds",),  ("bounds",)],
    output_dtypes=[np.dtype('O')],
    vectorize=True
)
CPU times: user 64.5 ms, sys: 60.2 ms, total: 125 ms
Wall time: 80.1 ms

Finally, we convert to a GeoDataframe, specifying the projected CRS

grid_df= gp.GeoDataFrame(
    data={"geometry": boxes.values, "y": boxes[y], "x": boxes[x]},
    index=boxes.indexes["point"],
    crs=lcc_wkt
)

We will now transform to an area preserving projection. This is imporant because we want to do area-weighted regridding. Here we use the NSIDC EASE-Grid 2.0 grid for the Northern Hemisphere.

crs_area = "EPSG:6931"

regions_df = regions_df.to_crs(crs_area)
grid_df = grid_df.to_crs(crs_area)

grid_df.crs
<Projected CRS: EPSG:6931>
Name: WGS 84 / NSIDC EASE-Grid 2.0 North
Axis Info [cartesian]:
- X[south]: Easting (metre)
- Y[south]: Northing (metre)
Area of Use:
- name: Northern hemisphere.
- bounds: (-180.0, 0.0, 180.0, 90.0)
Coordinate Operation:
- name: US NSIDC EASE-Grid 2.0 North
- method: Lambert Azimuthal Equal Area
Datum: World Geodetic System 1984 ensemble
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich

Key Step: Overlay the two geometries#

This is the magic of geopandas; it can calculate the overlap between the original grid and the regions. It is expensive because it has to compare 64800 grid boxes with 242 regions.

In this dataframe, the latitude and longitude values are from the grid, while all the other columns are from the regions.

overlay = grid_df.overlay(regions_df, keep_geom_type=True)

This is essentially already a sparse matrix mapping one grid space to the other. How sparse?

sparsity = len(overlay) / (len(grid_df) * len(regions_df))
sparsity
0.0024750477952660914

Let’s explore these overlays a little bit

We can verify that each basin’s area is preserved in the overlay operation.

overlay.geometry.area.groupby(overlay[region_name]).sum().nlargest(10)/1e6  # km2
name
Delaware Bay-Deep                         1073.773336
Ashokan Reservoir-Esopus Creek             205.798540
Lower Little Swatara Creek                 194.683255
Lake Wallenpaupack-Wallenpaupack Creek     180.104323
Bear Creek                                 176.743216
Bear Kill                                  174.877729
Upper Mahanoy Creek                        170.342913
Headwaters Paulins Kill River              167.072157
Black Creek                                160.267392
Snitz Creek-Quittapahilla Creek            159.798726
dtype: float64
overlay[overlay[region_name] == "Delaware Bay-Deep"].geometry.plot(edgecolor='k', aspect='equal')
<Axes: >
../_images/2a582d441b4c1538c7e150c7332b8a391d247742e8a93dcab1aaf61d6d5330f9.png
regions_df.geometry.area.groupby(regions_df[region_name]).sum().nlargest(10)
name
Delaware Bay-Deep                         1.073773e+09
Ashokan Reservoir-Esopus Creek            2.057985e+08
Lower Little Swatara Creek                1.946833e+08
Lake Wallenpaupack-Wallenpaupack Creek    1.801043e+08
Bear Creek                                1.767432e+08
Bear Kill                                 1.748777e+08
Upper Mahanoy Creek                       1.703429e+08
Headwaters Paulins Kill River             1.670722e+08
Black Creek                               1.602674e+08
Snitz Creek-Quittapahilla Creek           1.597987e+08
dtype: float64

Calculate the area fraction for each region#

This is another key step. This transform tells us how much of a country’s total area comes from each of the grid cells. This is accurate because we used an area-preserving CRS.

grid_cell_fraction = overlay.geometry.area.groupby(overlay[region_name]).transform(lambda x: x / x.sum())
grid_cell_fraction
0       0.006051
1       0.007286
2       0.000754
3       0.004720
4       0.014627
          ...   
6191    0.001910
6192    0.179560
6193    0.180305
6194    0.093585
6195    0.000193
Length: 6196, dtype: float64

We can verify that these all sum up to one.

grid_cell_fraction.groupby(overlay[region_name]).sum()
name
Alexauken Creek-Delaware River                       1.0
Allegheny Creek-Delaware River                       1.0
Angelica Creek-Schuylkill River                      1.0
Antietam Creek                                       1.0
Appenzell Creek                                      1.0
                                                    ... 
Wrangle Brook                                        1.0
Wright Brook-Headwaters West Brach Delaware River    1.0
Wright Creek-Lehigh River                            1.0
Wyomissing Creek                                     1.0
Yaleville Brook-Susquehanna River                    1.0
Length: 451, dtype: float64

Turn this into a sparse Xarray DataArray#

The first step is making a MultiIndex

multi_index = overlay.set_index([y, x, region_name]).index
df_weights = pd.DataFrame({"weights": grid_cell_fraction.values}, index=multi_index)
df_weights
weights
y x name
216039.100363 1.919902e+06 Delaware Bay-Deep 0.006051
1.923902e+06 Delaware Bay-Deep 0.007286
220039.100363 1.911902e+06 Delaware Bay-Deep 0.000754
1.915902e+06 Delaware Bay-Deep 0.004720
1.919902e+06 Delaware Bay-Deep 0.014627
... ... ... ...
616039.100363 1.859902e+06 Mine Kill 0.001910
1.863902e+06 Mine Kill 0.179560
1.867902e+06 Mine Kill 0.180305
1.871902e+06 Mine Kill 0.093585
1.875902e+06 Mine Kill 0.000193

6196 rows × 1 columns

We can bring this directly into xarray as a 1D Dataset.

import xarray as xr
ds_weights = xr.Dataset(df_weights)

Now we unstack it into a sparse array.

weights_sparse = ds_weights.unstack(sparse=True, fill_value=0.).weights

Here we can clearly see that this is a sparse matrix, mapping the input space (lat, lon) to the output space (SOVEREIGNT).

Perform Matrix Multiplication#

To regrid the data, we just have to multiply the original precip dataset by this matrix.

#regridded = xr.dot(ds[var], weights_sparse)
#regridded = sparse.einsum('ij,jk->ik', ds[var].data, weights_sparse.data)

Unfortunately, that doesn’t work out of the box, because sparse doesn’t implement einsum (see https://github.com/pydata/sparse/issues/31).

# regridded[0].compute()  # fails

Sparse does implement matmul, so we can use that. But we have to do some reshaping to make it work with our data.

def apply_weights_matmul_sparse(weights, data):

    assert isinstance(weights, sparse.SparseArray)
    assert isinstance(data, np.ndarray)
    data = sparse.COO.from_numpy(data)
    data_shape = data.shape
    # k = nlat * nlon
    n, k = data_shape[0], data_shape[1] * data_shape[2]
    data = data.reshape((n, k))
    weights_shape = weights.shape
    k_, m = weights_shape[0] * weights_shape[1], weights_shape[2]
    assert k == k_
    weights_data = weights.reshape((k, m))

    regridded = sparse.matmul(data, weights_data)
    assert regridded.shape == (n, m)
    return regridded.todense()

Variables at the same location on the grid can use the same weights#

#var = 'T2'  # 2m Temp, grid cell centers
#var = 'PREC_ACC_NC' # precip, grid cell centers
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    var_regridded = xr.apply_ufunc(
        apply_weights_matmul_sparse,
        weights_sparse,
        ds[var],
        join="left",
        input_core_dims=[[y, x, region_name], [y, x]],
        output_core_dims=[[region_name]],
        dask="parallelized",
        dask_gufunc_kwargs=dict(meta=[np.ndarray((0,))])
    )
    

var_regridded.compute()
CPU times: user 3.34 s, sys: 311 ms, total: 3.65 s
Wall time: 22.3 s
<xarray.DataArray (time: 15336, name: 451)>
array([[4.22613292e+01, 2.90950405e+01, 1.00138637e+01, ...,
        1.92264791e+00, 6.52060319e+00, 2.13817739e+00],
       [1.87979162e-02, 1.18648805e-01, 7.38264858e-01, ...,
        1.75915492e+00, 2.59606560e+00, 1.39184965e-01],
       [3.64235453e+01, 3.67771132e+01, 1.24645243e+01, ...,
        4.19176092e+01, 4.02009541e+00, 1.60088584e+01],
       ...,
       [1.29928335e+01, 5.04678211e+01, 4.85013792e+01, ...,
        6.39075503e+01, 5.25906782e+01, 6.25619592e+01],
       [9.69441831e+00, 1.70217795e+01, 5.50870547e-02, ...,
        2.60921915e-01, 1.63261528e-04, 2.04692349e+00],
       [           nan,            nan,            nan, ...,
                   nan,            nan,            nan]])
Coordinates:
  * name     (name) object 'Alexauken Creek-Delaware River' ... 'Yaleville Br...
  * time     (time) datetime64[ns] 1979-10-01 1979-10-02 ... 2021-09-25

Explore timeseries data by region:#

Plot monthly-average time series for two selected HUC12s.

ds_var = var_regridded.sel(name=['Delaware Bay-Deep', 'Black Creek']).resample(time="MS").mean().to_dataset(region_name)
ds_var.hvplot(x='time', grid=True, frame_width=1000)

Explore the mean var by region#

Calculate the average value over all time steps for every HU12

df_mean = var_regridded.to_pandas().mean()
df_mean.name = var
df_mean = pd.DataFrame(df_mean).reset_index()
merged = pd.merge(regions_df, df_mean)

Convert back to geographic coordinates for plotting

crs_geo = 'EPSG:4326'
merged_geo = merged.to_crs(crs_geo)

Holoviz:

merged_geo.hvplot(c=var, geo=True, cmap='viridis_r', frame_width=600, tiles='StamenTerrain', 
               title='CONUS404', alpha=0.7)

Be a good citizen and shut down the cluster#

… Even though it would shut down in 15 minutes with no activity

cluster.shutdown()
/home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/dask_gateway/client.py:1014: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited
  self.scheduler_comm.close_rpc()